Tools
MongoDB equivalent to FOR UPDATE SKIP LOCKED
2026-01-01
0 views
admin
Design ## Technical Insights ## Final Recommendation SELECT ... FOR UPDATE SKIP LOCKED is a vendor-specific SQL feature available in several relational databases (e.g., PostgreSQL, Oracle, MySQL). It helps parallel workers avoid waiting on locked rows. MongoDB’s concurrency model uses optimistic concurrency: reads don’t block writes, and writes don’t block reads. To coordinate parallel processing, you can reserve a document by writing a lock field so other workers skip it. I'll use an example discussed in the Reddit question "ACID read then write – Python": Client in python, multi process. Each process picks and reads one document, calls some public APIs, and add data to the document and save it. Then next document. What is written can depend on the read data. Question is: in python, how can I create and configure transactions in the code to make sure no other process can read or write its current document from the moment a process starts reading it until done writing its additional data? This means concurrent reads should not happen… In this example, I'll process messages based on their originating IP address. Multiple threads will enrich them with location data fetched from the public API at https://ip-api.com/. Here is an example of an initial document: Here is the document while it is being processed: Here is the same document after processing: Storing in-process information avoids long transactions that hide the current status and make troubleshooting difficult when the public API is slow. This script is designed as a complete, runnable demonstration of how to implement SELECT ... FOR UPDATE SKIP LOCKED-style parallel job claiming in MongoDB. The script will generate everything it needs, process it, and show the end state. This solution avoids explicit transactions, which is preferable because they would include a call to a public API with unpredictable response time. It also avoids using findOneAndUpdate, whose higher overhead comes from storing full pre- and post-images of documents for retryable operations. For large documents—possible in real workloads, even if not shown in this demo—this would lead to significant write amplification. Finally, setting an expiration timestamp allows automatic re-processing if a message fails. Below is the complete Python program, which you can test using different numbers of documents and threads: MongoDB’s storage engine guarantees atomicity for each update_one through its WriteUnitOfWork and RecoveryUnit mechanisms. However, maintaining read consistency across multiple operations requires application-level coordination. In this implementation, that coordination is provided by an atomic claim with conditional criteria, ensuring that only one worker can lock an unprocessed or expired document at a time. Several safeguards mitigate race conditions. The claim step narrows matches using the worker ID, lock expiry, and a random token. The final update then re-verifies all these fields before committing changes, preventing stale or stolen locks from being applied. Lock expiration enables automatic recovery from failures, and a small grace window accounts for clock skew in distributed systems. Write conflicts during concurrent updates are automatically resolved at the storage layer via optimistic concurrency control. This ensures correctness without blocking other operations. The result is a robust, non-blocking parallel processing workflow that preserves document-level ACID guarantees while scaling effectively in shared or cloud environments. In this design, each thread processes one message at a time, in index order. Enforcing strict global message ordering would be more complex. The primary goal here is the scalability of the parallel processing. When migrating from PostgreSQL to MongoDB—like between any two databases—avoid a direct feature-by-feature mapping, because the systems are fundamentally different. SKIP LOCKED works around blocking FOR UPDATE reads in PostgreSQL, while reads and writes do not block in MongoDB. Instead of replicating another database behavior, clarify the business requirement and design the most appropriate solution. In this example, rather than relying on generic transaction control like SQL, we modeled object states—such as claim acquisition and expiration—and store that state directly in the documents. Templates let you quickly answer FAQs or store snippets for re-use. Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment's permalink. Hide child comments as well For further actions, you may consider blocking this person and/or reporting abuse CODE_BLOCK:
{ _id: ObjectId('6956e772baea71e37a818e73'), originatingIp: '1.1.1.1', location: null
} Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
{ _id: ObjectId('6956e772baea71e37a818e73'), originatingIp: '1.1.1.1', location: null
} CODE_BLOCK:
{ _id: ObjectId('6956e772baea71e37a818e73'), originatingIp: '1.1.1.1', location: null
} CODE_BLOCK:
{ _id: ObjectId('6956e772baea71e37a818e73'), originatingIp: '1.1.1.1', location: null, lock: { by: 'franck', until: datetime.datetime(2026, 1, 1, 22, 33, 10, 833000) }
} Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
{ _id: ObjectId('6956e772baea71e37a818e73'), originatingIp: '1.1.1.1', location: null, lock: { by: 'franck', until: datetime.datetime(2026, 1, 1, 22, 33, 10, 833000) }
} CODE_BLOCK:
{ _id: ObjectId('6956e772baea71e37a818e73'), originatingIp: '1.1.1.1', location: null, lock: { by: 'franck', until: datetime.datetime(2026, 1, 1, 22, 33, 10, 833000) }
} CODE_BLOCK:
{ _id: ObjectId('6956e772baea71e37a818e73'), originatingIp: '1.1.1.1', location: { status: 'success', country: 'Hong Kong', countryCode: 'HK', region: 'HCW', regionName: 'Central and Western District', city: 'Hong Kong', zip: '', lat: 22.3193, lon: 114.1693, timezone: 'Asia/Hong_Kong', isp: 'Cloudflare, Inc', org: 'APNIC and Cloudflare DNS Resolver project', as: 'AS13335 Cloudflare, Inc.', query: '1.1.1.1' }
} Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
{ _id: ObjectId('6956e772baea71e37a818e73'), originatingIp: '1.1.1.1', location: { status: 'success', country: 'Hong Kong', countryCode: 'HK', region: 'HCW', regionName: 'Central and Western District', city: 'Hong Kong', zip: '', lat: 22.3193, lon: 114.1693, timezone: 'Asia/Hong_Kong', isp: 'Cloudflare, Inc', org: 'APNIC and Cloudflare DNS Resolver project', as: 'AS13335 Cloudflare, Inc.', query: '1.1.1.1' }
} CODE_BLOCK:
{ _id: ObjectId('6956e772baea71e37a818e73'), originatingIp: '1.1.1.1', location: { status: 'success', country: 'Hong Kong', countryCode: 'HK', region: 'HCW', regionName: 'Central and Western District', city: 'Hong Kong', zip: '', lat: 22.3193, lon: 114.1693, timezone: 'Asia/Hong_Kong', isp: 'Cloudflare, Inc', org: 'APNIC and Cloudflare DNS Resolver project', as: 'AS13335 Cloudflare, Inc.', query: '1.1.1.1' }
} COMMAND_BLOCK:
import os
import random
import socket
import threading
import time
from datetime import datetime, timedelta
import requests
from pymongo import MongoClient # Mongo connection and collection
client = MongoClient("mongodb://127.0.0.1:27017/?directConnection=true")
db = client.test
messages = db.message # Test settings (the test inserts documents, then runs the processing threads for some duration)
DOCUMENTS = 10 # number of documents created initially
THREADS = 5 # number of threads that loop to claim a document
SECONDS = 15 # thread stops looping on claim # Worker identity (to identify the thread, and set an expiration on the lock)
WORKER_ID = f"{socket.gethostname()}-{os.getpid()}"
LOCK_DURATION = timedelta(seconds=60) # assumes processing completes within that duration, if not, it will be claimed by another, and this one will not update it # Get the time
def utcnow(): return datetime.utcnow()
MAX_CLOCK_SKEW=timedelta(seconds=1) # used as a grace period when lock is expired # --- Prepare test messages (with random generated IP) ---
def insert_test_docs(): # Drop the collection completely (removes data + indexes) messages.drop() # Create the partial index for unprocessed docs (they have location = null ) messages.create_index( [("lock.until", 1)], partialFilterExpression={"location": None} ) # Generate random IPs for the test ips = [ ".".join(str(random.randint(1, 255)) for _ in range(4)) for _ in range(DOCUMENTS) ] # Explicitly set location=None to match the partial index filter docs = [ { "originatingIp": ip, "location": None } # A null location is the marker to process it for ip in ips ] messages.insert_many(docs) print(f"[STARTUP] Inserted {DOCUMENTS} test docs into 'message'") for doc in messages.find({}, {"_id": 0, "originatingIp": 1, "location": 1}): print(doc) # --- Claim a message ---
def claim_document(): now = utcnow() lock_expiry = now + LOCK_DURATION token = random.randint(1000, 9999) # unique lock token for extra safety # Atomic lock claim: match unlocked or steal locks expired result = messages.update_one( { "$and": [ # the location is not set { "location": None }, # the document is not locked, or locked expired including grace period { "$or": [ { "lock": { "$exists": False } }, { "lock.until": { "$lt": now - MAX_CLOCK_SKEW } } ] } ] }, { "$set": { "lock": { "by": WORKER_ID, "until": lock_expiry, "token": token } }} ) if result.modified_count == 0: return None # Fetch exactly the doc we locked — match by worker, expiry, AND token doc = messages.find_one({ "lock.by": WORKER_ID, "lock.until": lock_expiry, "lock.token": token }) if doc: print(f"[{WORKER_ID}] {threading.current_thread().name} claimed IP {doc['originatingIp']} with token={token}") else: print(f"[{WORKER_ID}] {threading.current_thread().name} claim succeeded but fetch failed — possible race?") return doc # --- Call the public API ---
def fetch_location(ip): url = f"http://ip-api.com/json/{ip}" try: resp = requests.get(url, timeout=30) if resp.status_code == 200: return resp.json() print(f"[API] Error: HTTP {resp.status_code} for {ip}") return None except Exception as e: print(f"[API] Exception for {ip}: {e}") return None # --- Process messages in a loop ---
def process_document(): start_time = time.time() timeout = SECONDS # seconds thread_name = threading.current_thread().name while True: # Try to claim a doc doc = claim_document() if doc: # We successfully claimed a doc — process it ip = doc["originatingIp"] location_data = fetch_location(ip) if not location_data: print(f"[{WORKER_ID}] {thread_name} failed to fetch location for {ip}") return # Final update only if lock is still valid now = utcnow() result = messages.update_one( { "_id": doc["_id"], "lock.by": WORKER_ID, "lock.until": {"$gte": now}, "lock.token": doc["lock"]["token"] }, { "$set": {"location": location_data}, "$unset": {"lock": ""} } ) # No doc claimed — check elapsed time before wait and retry elapsed = time.time() - start_time if elapsed >= timeout: print(f"[{WORKER_ID}] {thread_name} exiting after {elapsed:.2f}s") return time.sleep(5) # avoid hammering DB and the public API # --- Initialize and run multiple processing threads ---
def main(): print(f"\nInserting documents") insert_test_docs() print(f"\nStarting threads") threads = [] for i in range(THREADS): tname = f"T{i}" t = threading.Thread(target=process_document, name=tname) t.start() threads.append(t) for t in threads: t.join() print(f"\n[{WORKER_ID}] Check final documents:") for doc in messages.find({}, {"originatingIp": 1, "location.query": 1, "location.country": 1, "location.message": 1, "lock.by": 1, "lock.until": 1}): print(doc) if __name__ == "__main__": main() Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
import os
import random
import socket
import threading
import time
from datetime import datetime, timedelta
import requests
from pymongo import MongoClient # Mongo connection and collection
client = MongoClient("mongodb://127.0.0.1:27017/?directConnection=true")
db = client.test
messages = db.message # Test settings (the test inserts documents, then runs the processing threads for some duration)
DOCUMENTS = 10 # number of documents created initially
THREADS = 5 # number of threads that loop to claim a document
SECONDS = 15 # thread stops looping on claim # Worker identity (to identify the thread, and set an expiration on the lock)
WORKER_ID = f"{socket.gethostname()}-{os.getpid()}"
LOCK_DURATION = timedelta(seconds=60) # assumes processing completes within that duration, if not, it will be claimed by another, and this one will not update it # Get the time
def utcnow(): return datetime.utcnow()
MAX_CLOCK_SKEW=timedelta(seconds=1) # used as a grace period when lock is expired # --- Prepare test messages (with random generated IP) ---
def insert_test_docs(): # Drop the collection completely (removes data + indexes) messages.drop() # Create the partial index for unprocessed docs (they have location = null ) messages.create_index( [("lock.until", 1)], partialFilterExpression={"location": None} ) # Generate random IPs for the test ips = [ ".".join(str(random.randint(1, 255)) for _ in range(4)) for _ in range(DOCUMENTS) ] # Explicitly set location=None to match the partial index filter docs = [ { "originatingIp": ip, "location": None } # A null location is the marker to process it for ip in ips ] messages.insert_many(docs) print(f"[STARTUP] Inserted {DOCUMENTS} test docs into 'message'") for doc in messages.find({}, {"_id": 0, "originatingIp": 1, "location": 1}): print(doc) # --- Claim a message ---
def claim_document(): now = utcnow() lock_expiry = now + LOCK_DURATION token = random.randint(1000, 9999) # unique lock token for extra safety # Atomic lock claim: match unlocked or steal locks expired result = messages.update_one( { "$and": [ # the location is not set { "location": None }, # the document is not locked, or locked expired including grace period { "$or": [ { "lock": { "$exists": False } }, { "lock.until": { "$lt": now - MAX_CLOCK_SKEW } } ] } ] }, { "$set": { "lock": { "by": WORKER_ID, "until": lock_expiry, "token": token } }} ) if result.modified_count == 0: return None # Fetch exactly the doc we locked — match by worker, expiry, AND token doc = messages.find_one({ "lock.by": WORKER_ID, "lock.until": lock_expiry, "lock.token": token }) if doc: print(f"[{WORKER_ID}] {threading.current_thread().name} claimed IP {doc['originatingIp']} with token={token}") else: print(f"[{WORKER_ID}] {threading.current_thread().name} claim succeeded but fetch failed — possible race?") return doc # --- Call the public API ---
def fetch_location(ip): url = f"http://ip-api.com/json/{ip}" try: resp = requests.get(url, timeout=30) if resp.status_code == 200: return resp.json() print(f"[API] Error: HTTP {resp.status_code} for {ip}") return None except Exception as e: print(f"[API] Exception for {ip}: {e}") return None # --- Process messages in a loop ---
def process_document(): start_time = time.time() timeout = SECONDS # seconds thread_name = threading.current_thread().name while True: # Try to claim a doc doc = claim_document() if doc: # We successfully claimed a doc — process it ip = doc["originatingIp"] location_data = fetch_location(ip) if not location_data: print(f"[{WORKER_ID}] {thread_name} failed to fetch location for {ip}") return # Final update only if lock is still valid now = utcnow() result = messages.update_one( { "_id": doc["_id"], "lock.by": WORKER_ID, "lock.until": {"$gte": now}, "lock.token": doc["lock"]["token"] }, { "$set": {"location": location_data}, "$unset": {"lock": ""} } ) # No doc claimed — check elapsed time before wait and retry elapsed = time.time() - start_time if elapsed >= timeout: print(f"[{WORKER_ID}] {thread_name} exiting after {elapsed:.2f}s") return time.sleep(5) # avoid hammering DB and the public API # --- Initialize and run multiple processing threads ---
def main(): print(f"\nInserting documents") insert_test_docs() print(f"\nStarting threads") threads = [] for i in range(THREADS): tname = f"T{i}" t = threading.Thread(target=process_document, name=tname) t.start() threads.append(t) for t in threads: t.join() print(f"\n[{WORKER_ID}] Check final documents:") for doc in messages.find({}, {"originatingIp": 1, "location.query": 1, "location.country": 1, "location.message": 1, "lock.by": 1, "lock.until": 1}): print(doc) if __name__ == "__main__": main() COMMAND_BLOCK:
import os
import random
import socket
import threading
import time
from datetime import datetime, timedelta
import requests
from pymongo import MongoClient # Mongo connection and collection
client = MongoClient("mongodb://127.0.0.1:27017/?directConnection=true")
db = client.test
messages = db.message # Test settings (the test inserts documents, then runs the processing threads for some duration)
DOCUMENTS = 10 # number of documents created initially
THREADS = 5 # number of threads that loop to claim a document
SECONDS = 15 # thread stops looping on claim # Worker identity (to identify the thread, and set an expiration on the lock)
WORKER_ID = f"{socket.gethostname()}-{os.getpid()}"
LOCK_DURATION = timedelta(seconds=60) # assumes processing completes within that duration, if not, it will be claimed by another, and this one will not update it # Get the time
def utcnow(): return datetime.utcnow()
MAX_CLOCK_SKEW=timedelta(seconds=1) # used as a grace period when lock is expired # --- Prepare test messages (with random generated IP) ---
def insert_test_docs(): # Drop the collection completely (removes data + indexes) messages.drop() # Create the partial index for unprocessed docs (they have location = null ) messages.create_index( [("lock.until", 1)], partialFilterExpression={"location": None} ) # Generate random IPs for the test ips = [ ".".join(str(random.randint(1, 255)) for _ in range(4)) for _ in range(DOCUMENTS) ] # Explicitly set location=None to match the partial index filter docs = [ { "originatingIp": ip, "location": None } # A null location is the marker to process it for ip in ips ] messages.insert_many(docs) print(f"[STARTUP] Inserted {DOCUMENTS} test docs into 'message'") for doc in messages.find({}, {"_id": 0, "originatingIp": 1, "location": 1}): print(doc) # --- Claim a message ---
def claim_document(): now = utcnow() lock_expiry = now + LOCK_DURATION token = random.randint(1000, 9999) # unique lock token for extra safety # Atomic lock claim: match unlocked or steal locks expired result = messages.update_one( { "$and": [ # the location is not set { "location": None }, # the document is not locked, or locked expired including grace period { "$or": [ { "lock": { "$exists": False } }, { "lock.until": { "$lt": now - MAX_CLOCK_SKEW } } ] } ] }, { "$set": { "lock": { "by": WORKER_ID, "until": lock_expiry, "token": token } }} ) if result.modified_count == 0: return None # Fetch exactly the doc we locked — match by worker, expiry, AND token doc = messages.find_one({ "lock.by": WORKER_ID, "lock.until": lock_expiry, "lock.token": token }) if doc: print(f"[{WORKER_ID}] {threading.current_thread().name} claimed IP {doc['originatingIp']} with token={token}") else: print(f"[{WORKER_ID}] {threading.current_thread().name} claim succeeded but fetch failed — possible race?") return doc # --- Call the public API ---
def fetch_location(ip): url = f"http://ip-api.com/json/{ip}" try: resp = requests.get(url, timeout=30) if resp.status_code == 200: return resp.json() print(f"[API] Error: HTTP {resp.status_code} for {ip}") return None except Exception as e: print(f"[API] Exception for {ip}: {e}") return None # --- Process messages in a loop ---
def process_document(): start_time = time.time() timeout = SECONDS # seconds thread_name = threading.current_thread().name while True: # Try to claim a doc doc = claim_document() if doc: # We successfully claimed a doc — process it ip = doc["originatingIp"] location_data = fetch_location(ip) if not location_data: print(f"[{WORKER_ID}] {thread_name} failed to fetch location for {ip}") return # Final update only if lock is still valid now = utcnow() result = messages.update_one( { "_id": doc["_id"], "lock.by": WORKER_ID, "lock.until": {"$gte": now}, "lock.token": doc["lock"]["token"] }, { "$set": {"location": location_data}, "$unset": {"lock": ""} } ) # No doc claimed — check elapsed time before wait and retry elapsed = time.time() - start_time if elapsed >= timeout: print(f"[{WORKER_ID}] {thread_name} exiting after {elapsed:.2f}s") return time.sleep(5) # avoid hammering DB and the public API # --- Initialize and run multiple processing threads ---
def main(): print(f"\nInserting documents") insert_test_docs() print(f"\nStarting threads") threads = [] for i in range(THREADS): tname = f"T{i}" t = threading.Thread(target=process_document, name=tname) t.start() threads.append(t) for t in threads: t.join() print(f"\n[{WORKER_ID}] Check final documents:") for doc in messages.find({}, {"originatingIp": 1, "location.query": 1, "location.country": 1, "location.message": 1, "lock.by": 1, "lock.until": 1}): print(doc) if __name__ == "__main__": main() - insert_test_docs() inserts test data with random IP addresses in a new collection "message", and creates a partial index to get the message to process ({location: null}).
- claim_document() updates a message to process, adding lock information so that another thread will not pick the same, and fetches the document. The criteria are that it must be processed ({location: null}) and not locked, or the lock must have expired (with a 1s grace to account for clock skew).
- fetch_location() is the call to the public API, here getting location information for an IP address.
- process_document() calls claim_document() to get a message to process, with a lock. It calls fetch_location() and updates the document with the location. It ensures the lock is still in place before the update, then unsets it. Each thread runs in a loop, claiming and processing documents until the timeout.
- main() calls those in sequence and displays the final documents.
how-totutorialguidedev.toaidnsmysqlpostgresqlpythondatabase