1. Build payload → ZSTD-compressed protobuf in a direct ByteBuffer
2. Resolve partition folder → sanitize id, validate against FSx root
3. Create batch directory → /fsx/<part>/year=Y/month=M/day=D/hour=H/batch-<uuid>/
4. Reserve in-flight bytes → byte-based backpressure (more on this below)
5. Write to a unique temp file → data.bin.tmp.<uuid>.1
6. fsync the temp file → optional, controlled by config
7. Atomic rename to data.bin → Files.move(tmp, final, ATOMIC_MOVE)
8. Publish pointer message → { filePath, batchId, sizeBytes, crc32c }
9. Release in-flight bytes → permit auto-closed
1. Build payload → ZSTD-compressed protobuf in a direct ByteBuffer
2. Resolve partition folder → sanitize id, validate against FSx root
3. Create batch directory → /fsx/<part>/year=Y/month=M/day=D/hour=H/batch-<uuid>/
4. Reserve in-flight bytes → byte-based backpressure (more on this below)
5. Write to a unique temp file → data.bin.tmp.<uuid>.1
6. fsync the temp file → optional, controlled by config
7. Atomic rename to data.bin → Files.move(tmp, final, ATOMIC_MOVE)
8. Publish pointer message → { filePath, batchId, sizeBytes, crc32c }
9. Release in-flight bytes → permit auto-closed
1. Build payload → ZSTD-compressed protobuf in a direct ByteBuffer
2. Resolve partition folder → sanitize id, validate against FSx root
3. Create batch directory → /fsx/<part>/year=Y/month=M/day=D/hour=H/batch-<uuid>/
4. Reserve in-flight bytes → byte-based backpressure (more on this below)
5. Write to a unique temp file → data.bin.tmp.<uuid>.1
6. fsync the temp file → optional, controlled by config
7. Atomic rename to data.bin → Files.move(tmp, final, ATOMIC_MOVE)
8. Publish pointer message → { filePath, batchId, sizeBytes, crc32c }
9. Release in-flight bytes → permit auto-closed
1. Pull notification message from bus
2. Validate pointer (path within allowed root, checksum optional)
3. Open the file via the local NFS mount
4. Stream the bytes — gather them, decompress, deserialize
5. Process the events
6. Acknowledge / delete the bus message
1. Pull notification message from bus
2. Validate pointer (path within allowed root, checksum optional)
3. Open the file via the local NFS mount
4. Stream the bytes — gather them, decompress, deserialize
5. Process the events
6. Acknowledge / delete the bus message
1. Pull notification message from bus
2. Validate pointer (path within allowed root, checksum optional)
3. Open the file via the local NFS mount
4. Stream the bytes — gather them, decompress, deserialize
5. Process the events
6. Acknowledge / delete the bus message
private PayloadWriteOutcome writeGatheredBuffers(...) { final CRC32C checksumCrc32c = checksumEnabled ? new CRC32C() : null; long totalBytesWritten = 0L; int nextBufferIndex = nextReadableBufferIndex(payloadBuffers, 0); while (nextBufferIndex < payloadBuffers.size()) { ByteBuffer[] gatherSources = nextGatherSources(payloadBuffers, nextBufferIndex); updateChecksum(checksumCrc32c, gatherSources); while (hasRemaining(gatherSources)) { long bytesWritten = fileChannel.write(gatherSources); // writev under the hood totalBytesWritten += bytesWritten; } nextBufferIndex = nextReadableBufferIndex(payloadBuffers, nextBufferIndex + gatherSources.length); } return new PayloadWriteOutcome(totalBytesWritten, checksumCrc32c == null ? null : checksumCrc32c.getValue());
}
private PayloadWriteOutcome writeGatheredBuffers(...) { final CRC32C checksumCrc32c = checksumEnabled ? new CRC32C() : null; long totalBytesWritten = 0L; int nextBufferIndex = nextReadableBufferIndex(payloadBuffers, 0); while (nextBufferIndex < payloadBuffers.size()) { ByteBuffer[] gatherSources = nextGatherSources(payloadBuffers, nextBufferIndex); updateChecksum(checksumCrc32c, gatherSources); while (hasRemaining(gatherSources)) { long bytesWritten = fileChannel.write(gatherSources); // writev under the hood totalBytesWritten += bytesWritten; } nextBufferIndex = nextReadableBufferIndex(payloadBuffers, nextBufferIndex + gatherSources.length); } return new PayloadWriteOutcome(totalBytesWritten, checksumCrc32c == null ? null : checksumCrc32c.getValue());
}
private PayloadWriteOutcome writeGatheredBuffers(...) { final CRC32C checksumCrc32c = checksumEnabled ? new CRC32C() : null; long totalBytesWritten = 0L; int nextBufferIndex = nextReadableBufferIndex(payloadBuffers, 0); while (nextBufferIndex < payloadBuffers.size()) { ByteBuffer[] gatherSources = nextGatherSources(payloadBuffers, nextBufferIndex); updateChecksum(checksumCrc32c, gatherSources); while (hasRemaining(gatherSources)) { long bytesWritten = fileChannel.write(gatherSources); // writev under the hood totalBytesWritten += bytesWritten; } nextBufferIndex = nextReadableBufferIndex(payloadBuffers, nextBufferIndex + gatherSources.length); } return new PayloadWriteOutcome(totalBytesWritten, checksumCrc32c == null ? null : checksumCrc32c.getValue());
}
{ "filePath": "/fsx/<partition>/year=2026/month=05/day=24/hour=14/batch-abc123/data.bin", "batchId": "abc123", "partitionId": "<partition>", "sizeBytes": 524288, "crc32c": "f3c19a2b"
}
{ "filePath": "/fsx/<partition>/year=2026/month=05/day=24/hour=14/batch-abc123/data.bin", "batchId": "abc123", "partitionId": "<partition>", "sizeBytes": 524288, "crc32c": "f3c19a2b"
}
{ "filePath": "/fsx/<partition>/year=2026/month=05/day=24/hour=14/batch-abc123/data.bin", "batchId": "abc123", "partitionId": "<partition>", "sizeBytes": 524288, "crc32c": "f3c19a2b"
}
if (this.fileSystemOperations.exists(finalFilePath)) { return existingResultAsync(request, finalFilePath, payload, expectedBytes, attempt);
} // in toExistingResult:
long existingBytes = this.fileSystemOperations.size(finalFilePath);
if (existingBytes != expectedBytes) { throw new FsxWriteException("Existing FSx file does not match expected payload size: " + finalFilePath);
}
return toWriteResult(request, finalFilePath, existingBytes, expectedChecksumCrc32c, attempt, true);
if (this.fileSystemOperations.exists(finalFilePath)) { return existingResultAsync(request, finalFilePath, payload, expectedBytes, attempt);
} // in toExistingResult:
long existingBytes = this.fileSystemOperations.size(finalFilePath);
if (existingBytes != expectedBytes) { throw new FsxWriteException("Existing FSx file does not match expected payload size: " + finalFilePath);
}
return toWriteResult(request, finalFilePath, existingBytes, expectedChecksumCrc32c, attempt, true);
if (this.fileSystemOperations.exists(finalFilePath)) { return existingResultAsync(request, finalFilePath, payload, expectedBytes, attempt);
} // in toExistingResult:
long existingBytes = this.fileSystemOperations.size(finalFilePath);
if (existingBytes != expectedBytes) { throw new FsxWriteException("Existing FSx file does not match expected payload size: " + finalFilePath);
}
return toWriteResult(request, finalFilePath, existingBytes, expectedChecksumCrc32c, attempt, true);
final class FsxInFlightByteLimiter { private final long maxInFlightBytes; private final int maxInFlightOperations; private long inFlightBytes; private int inFlightOperations; private final Queue<PendingAcquire> pendingAcquires = new ArrayDeque<>(); CompletableFuture<Permit> acquire(long requestedBytes, Duration timeout, ScheduledExecutorService scheduler) { PendingAcquire pending = new PendingAcquire(toReservedBytes(requestedBytes)); synchronized (monitor) { if (pendingAcquires.isEmpty() && canAcquire(pending)) { acquireNow(pending); completeNow(pending); } else { pendingAcquires.add(pending); } } scheduler.schedule(() -> timeout(pending), timeout.toMillis(), TimeUnit.MILLISECONDS); return pending.future(); } private boolean canAcquire(PendingAcquire pending) { return inFlightOperations < maxInFlightOperations && inFlightBytes + pending.reservedBytes() <= maxInFlightBytes; }
}
final class FsxInFlightByteLimiter { private final long maxInFlightBytes; private final int maxInFlightOperations; private long inFlightBytes; private int inFlightOperations; private final Queue<PendingAcquire> pendingAcquires = new ArrayDeque<>(); CompletableFuture<Permit> acquire(long requestedBytes, Duration timeout, ScheduledExecutorService scheduler) { PendingAcquire pending = new PendingAcquire(toReservedBytes(requestedBytes)); synchronized (monitor) { if (pendingAcquires.isEmpty() && canAcquire(pending)) { acquireNow(pending); completeNow(pending); } else { pendingAcquires.add(pending); } } scheduler.schedule(() -> timeout(pending), timeout.toMillis(), TimeUnit.MILLISECONDS); return pending.future(); } private boolean canAcquire(PendingAcquire pending) { return inFlightOperations < maxInFlightOperations && inFlightBytes + pending.reservedBytes() <= maxInFlightBytes; }
}
final class FsxInFlightByteLimiter { private final long maxInFlightBytes; private final int maxInFlightOperations; private long inFlightBytes; private int inFlightOperations; private final Queue<PendingAcquire> pendingAcquires = new ArrayDeque<>(); CompletableFuture<Permit> acquire(long requestedBytes, Duration timeout, ScheduledExecutorService scheduler) { PendingAcquire pending = new PendingAcquire(toReservedBytes(requestedBytes)); synchronized (monitor) { if (pendingAcquires.isEmpty() && canAcquire(pending)) { acquireNow(pending); completeNow(pending); } else { pendingAcquires.add(pending); } } scheduler.schedule(() -> timeout(pending), timeout.toMillis(), TimeUnit.MILLISECONDS); return pending.future(); } private boolean canAcquire(PendingAcquire pending) { return inFlightOperations < maxInFlightOperations && inFlightBytes + pending.reservedBytes() <= maxInFlightBytes; }
}
/fsx-root/ <partitionId>/ year=2026/ month=05/ day=24/ hour=14/ batch-<uuid>/ data.bin [optional metadata files]
/fsx-root/ <partitionId>/ year=2026/ month=05/ day=24/ hour=14/ batch-<uuid>/ data.bin [optional metadata files]
/fsx-root/ <partitionId>/ year=2026/ month=05/ day=24/ hour=14/ batch-<uuid>/ data.bin [optional metadata files]
producer/consumer container │ ▼ /fsx-root/ (Linux NFS mount, nconnect=16, NFSv4.1) │ ▼ AsynchronousFileChannel, FileChannel.write(ByteBuffer[])
producer/consumer container │ ▼ /fsx-root/ (Linux NFS mount, nconnect=16, NFSv4.1) │ ▼ AsynchronousFileChannel, FileChannel.write(ByteBuffer[])
producer/consumer container │ ▼ /fsx-root/ (Linux NFS mount, nconnect=16, NFSv4.1) │ ▼ AsynchronousFileChannel, FileChannel.write(ByteBuffer[])
CompletableFuture<FsxFileWriteResult> writeFuture = inFlightByteLimiter.acquire(...) .thenCompose(permit -> writeWithPermit(request, payload, payloadBytes, permit)); writeFuture.whenComplete((result, throwable) -> payload.close()); return withTimeout(writeFuture, configuration.getWriteTimeout()) .whenComplete((result, throwable) -> recordWriteMetrics(...));
CompletableFuture<FsxFileWriteResult> writeFuture = inFlightByteLimiter.acquire(...) .thenCompose(permit -> writeWithPermit(request, payload, payloadBytes, permit)); writeFuture.whenComplete((result, throwable) -> payload.close()); return withTimeout(writeFuture, configuration.getWriteTimeout()) .whenComplete((result, throwable) -> recordWriteMetrics(...));
CompletableFuture<FsxFileWriteResult> writeFuture = inFlightByteLimiter.acquire(...) .thenCompose(permit -> writeWithPermit(request, payload, payloadBytes, permit)); writeFuture.whenComplete((result, throwable) -> payload.close()); return withTimeout(writeFuture, configuration.getWriteTimeout()) .whenComplete((result, throwable) -> recordWriteMetrics(...)); - Is produced in continuous high volume.
- Is consumed shortly after production — usually within seconds or minutes, almost never more than an hour or two.
- After consumption, is either archived for compliance/audit or deleted entirely.
- Has no value sitting on a transport between producer and consumer beyond getting from one side to the other. - The bus carries small messages, so the bus cost collapses. Pointer messages are roughly two hundred bytes each. A bus that was strained to carry 700 MB/s of payload is now carrying maybe 200 KB/s of pointers. Shard counts can drop by an order of magnitude or more. One real architecture I've seen reduced a Kinesis shard count from 700 to 32 — a 95% reduction in transport spend, with no change to the actual byte volume being moved.
- The filesystem is billed for what it stores, not for what passes through it. You pay for the bytes that exist, not the bytes that have existed and have already been consumed and deleted. If your retention is one hour, you pay for one hour's worth of bytes resident at any given moment.
- The filesystem gives you primitives the bus doesn't. Snapshots. Random access. Concurrent multi-reader semantics. Standard POSIX tools to inspect, validate, and debug. Your data-engineering team can ls your transitional data. They cannot ls a Kinesis stream.
- The compute path is the same speed or faster. A modern NFS mount on the same VPC moves data faster than any HTTP-based streaming bus. Once you understand the math, the streaming-bus model is the slow option, not the fast one. - Frequent Access — data touched within the last 30 days. Baseline tier, sub-millisecond reads from cache, full performance.
- Infrequent Access — data not touched for 30 to 90 days. Roughly 44% cheaper than Frequent Access.
- Archive Instant Access — data not touched for 90+ days. Roughly 65% cheaper than Infrequent Access. Still online, no restore needed; first-byte latency in the tens of milliseconds. - maxInFlightOperations — caps the number of concurrent writes (still useful to prevent thundering herds on the file I/O thread pool).
- maxInFlightBytes — caps the aggregate payload size of concurrent writes (the real protection against memory blow-up). - aws-sdk-java-v2 + S3AsyncClient + S3 Access Points for FSx. Works if you expose your FSx file system through an S3 access point. Mature, async, multipart, retries, integrates with the AWS SDK ecosystem. The downside is the S3-style access has tens-of-milliseconds latency rather than the sub-millisecond NFS-mount latency. For batch jobs that's fine; for hot transitional data you give up most of the perf advantage.
- dCache/nfs4j. A pure-Java NFSv3 and NFSv4 implementation. The most serious Java NFS library available. It's actively maintained, has a JMH benchmarks module, runs on Java 17. If you absolutely need to write your own NFS client (or extend one), this is where you'd start. But it's not a turnkey FSx client — you'd be building protocol code, and AWS's own performance documentation is opinionated about mount-time options that are easier to apply with the kernel client.
- EMCECS/nfs-client-java. NFSv3 only, dependent on a years-old version of Netty. Workable for legacy use, not a foundation for a petabyte-scale system in 2026.
- SMBJ, jcifs. SMB protocol clients. Wrong protocol family — FSx for OpenZFS doesn't speak SMB.
- Hadoop / Spark NFS connectors. Useful for ideas about request pipelining, not a foundation. - Use nconnect=16 (or up to your kernel's supported maximum) to parallelize NFS over 16 TCP connections.
- Set rsize=1048576 and wsize=1048576 for 1 MiB read/write chunks.
- Use NFSv4.1 for the locking and the cleaner failover semantics.
- Place producers and consumers in the same AZ as the file system primary for sub-ms latency and to avoid cross-AZ data transfer charges. - fsx.write.success / fsx.write.failed — counters of completed writes
- fsx.write.latency — write latency in milliseconds
- fsx.write.bytes — total bytes written
- fsx.write.retry — counter of retry attempts
- fsx.write.inflight.bytes — current bytes reserved by running writes
- fsx.write.inflight.operations — current running write count
- fsx.write.backpressure.pending — current queue depth on the limiter