Designing a Restartable, Lossless, High-Throughput Migration Pipeline
In large-scale data migration processes, controlling system pressure, preventing data loss, and maintaining operational sustainability while millions of records are in transit are the key factors that make the difference.
This article describes a real-world migration problem and the architecture of a solution designed by the Keymate team to move tens of millions of records from an external database into Keycloak, utilizing a database-backed work queue, bounded concurrency, durable retry semantics, and reactive execution.
The solution is implemented as an open-source migrator application, published at: Keymate Migrator on GitHub.
At a high level, the problem sounds deceptively simple:
Read a large number of records from an external database and send them to a target system as fast as possible.
In reality, the constraints make it significantly harder.
Keycloak import endpoint) must be driven at a steady, controlled load to fully utilize its capacity and complete the migration within a reasonable time window.A common first attempt looks like this:
This approach usually fails in predictable ways:
The fundamental mistake is assuming that more threads equal more throughput.
In this context, bounded concurrency is not an optimization technique, but a safety mechanism that keeps throughput stable while protecting downstream systems. Throughput is determined by the noisiest bottleneck, not by the number of workers.
Instead of pushing data directly from the source into the target, we introduce an explicit work queue.
work_queue TableAll migration tasks first materialized into a table. This pattern closely matches the Transactional Outbox / Work Queue pattern, with the database acting as the source of truth for progress and retry state.
As described in the Configuration Power section, we use app.claimers to define how many concurrent threads compete to fetch work from the database. Without proper handling, this would create a significant bottleneck where threads block each other while waiting to release row-level locks.
To solve this, we leverage a powerful PostgreSQL feature: the FOR UPDATE SKIP LOCKED clause. This allows our concurrent threads to operate at high velocity without needing a complex distributed locking mechanism.
WITH picked AS (
SELECT id FROM work_queue
WHERE next_retry_at <= now()
ORDER BY id
FOR UPDATE SKIP LOCKED LIMIT $1
)
DELETE FROM work_queue w
USING picked p
WHERE w.id = p.id
RETURNING w.id, w.source_table, w.source_pk, w.payload, w.attempt_count;
By using this pattern, we achieve several critical operational benefits:
CPU utilization.READY to PROCESSING happens in a single atomic database operation. This guarantees that no two threads—whether in the same instance or across multiple containers—ever process the same record.Redis.To guarantee that we don't process the same record twice (idempotency) or lose track of what's been done, we use a processed_log table alongside the work queue. The following SQL snippet demonstrates how we enqueue work only for items that haven't been successfully processed yet:
-- Enqueue orders that haven't been processed successfully
INSERT INTO work_queue (source_table, source_pk, payload)
SELECT 'orders', o.id, jsonb_build_object('id', o.id, 'data', o.data)
FROM orders o
WHERE NOT EXISTS (SELECT 1
FROM processed_log p
WHERE p.source_table = 'orders'
AND p.source_pk = o.id
AND p.status = 'OK')
ON CONFLICT (source_table, source_pk) DO NOTHING;
This simple mechanism ensures that if the migration is interrupted and restarted, it picks up exactly where it left off, without duplicating work.
The migrator runs a continuous processing loop:
work_queue.Instead of letting concurrency grow unbounded, the system enforces strict limits:
This ensures stable throughput, predictable resource usage, and prevents retry storms. Concurrency is treated as a tuning parameter, not an optimization shortcut.
We control this behavior through simple configuration properties, allowing us to tune the migrator from a gentle trickle to a high-throughput stream without changing code:
# Controls how many jobs are processed in parallel
app.concurrency=64
# Controls how many threads compete to claim work from the DB
app.claimers=32
# Defines the maximum size of the reactive DB connection pool
quarkus.datasource.reactive.max-size=100
The migrator is built using Quarkus, Mutiny, and a Reactive PostgreSQL client.
The reactive model provides non-blocking I/O and efficient use of event loops. Importantly, reactive does not mean uncontrolled parallelism. All reactive streams are still explicitly bounded.
The core processing logic is a reactive pipeline that flows from claiming a job to processing it and handling the result. Here is a streamlined version of the production loop that demonstrates the bounded concurrency pattern:
private Uni<Void> runBatch() {
return repo.claimBatch(fetchSize)
.onItem().transformToUni(list -> {
if (list.isEmpty()) return Uni.createFrom().voidItem().onItem().delayIt().by(idleMs);
// 1) Split jobs into chunks of size 'concurrency'
return Multi.createFrom().iterable(list)
.group().intoLists().of(concurrency) // derived from app.concurrency (see Configuration Power)
// 2) Process chunks sequentially
.onItem().transformToUniAndConcatenate(chunk ->
// 3) Process jobs within the chunk in parallel
Multi.createFrom().iterable(chunk)
.onItem().transformToUniAndMerge(job ->
// The core business logic
router.process(job)
.call(() -> repo.markOk(job))
// Robust failure handling (DB requeue -> Disk Outbox)
.onFailure().call(err -> repo.requeue(job, err)
.onFailure().call(e -> outboxBuffer.offer(job, err))
)
// Resilience: Don't break the loop on individual errors
.onFailure().recoverWithUni(Uni.createFrom().voidItem())
)
.collect().asList()
.replaceWithVoid()
)
.collect().asList()
.replaceWithVoid();
});
}
In this snippet, transformToUniAndMerge is the hero. It subscribes to the inner streams concurrently, but crucially, that concurrency is not implicit or unbounded.
The upper limit is defined earlier in the Configuration Power section via app.concurrency. That same value is used as the chunk size in the reactive pipeline, making it the single source of truth for how many requests may be in flight at once.
This guarantees that the system will never exceed app.concurrency concurrent operations, regardless of how fast the database can supply new jobs.
This structure allows us to handle thousands of concurrent operations with a very small number of OS threads, maximizing efficiency.
With the arrival of Java 21, Virtual Threads (Project Loom) are often seen as the default replacement for reactive programming. However, for this specific use case, we deliberately chose the Reactive model (Mutiny) over Virtual Threads.
The reasoning is threefold:
non-blocking backpressure mechanisms that Virtual Threads do not offer out of the box..retry().withBackoff(), .onFailure().recoverWith(), and .collect().asList() allow us to express complex error handling and batching logic declaratively.Virtual Threads are cheap, the resources they access (DB connections, file handles) are not. Spawning 10,000 virtual threads is easy, but opening 10,000 database connections is disastrous. The reactive model forces us to be explicit about concurrency limits, preventing the illusion of infinite capacity that can lead to resource exhaustion.Failures are expected in large migrations: network hiccups, transient target errors, or temporary database contention. The key question is how failures are handled.
We treat failure handling as a lifecycle with two critical phases: Persistence and Scheduling.
When a job fails, the error context is captured immediately. We employ a dual-layer safety net to ensure this failure is never lost:
work_queue table) with an incremented attempt count.This logic is implemented directly in the reactive pipeline:
// Inside the processing pipeline:
.onFailure().call(err ->
// 1. Try to persist failure state to the Database
repo.requeue(job, err)
// 2. If DB fails (e.g., connection lost), fallback to Disk Outbox
.onFailure().call(dbErr ->
outboxBuffer.offer(job, err)
)
)
Persisting the failure is only half the battle. The next question is when to retry.
Failed jobs are not immediately retried. Instead:
next_retry_at timestamp is computed using an exponential backoff strategy.This prevents retry storms, cascading failures, and target system overload. Retries become a controlled flow, not a panic reaction.
The disk outbox acts as a temporary buffer during database outages. It is segmented for efficient append and drain operations. A background drainer process continuously monitors these files and restores the jobs to the database when it becomes available:
private Uni<Void> drainOnce() {
// 1. Read a batch of failed jobs from the disk segment
return ioOnWorker(this::readBatchUnsafe)
.onItem().transformToUni(batch ->
// 2. Process them in parallel (bounded by dbConcurrency)
Multi.createFrom().iterable(batch.items())
.group().intoLists().of(dbConcurrency)
.onItem().transformToUniAndConcatenate(chunk ->
// 3. Write back to the main Work Queue in the DB
Multi.createFrom().iterable(chunk)
.onItem().transformToUniAndMerge(line ->
repo.requeueParams(line.req())
)
.collect().asList()
)
// 4. If successful, advance the file offset (commit)
.invoke(() -> writeOffsetAtomic(batch.off(), newOffset))
);
}
Because all state lives in durable storage:
last_error column in the processed_log table provides immediate insight into why records are failing.processed_log table keeps a history of every attempt.Operationally, it behaves like a pipeline, not a fire-and-forget script.
Theory is good, but practice is better. To validate this architecture, we built a simulation environment included in the project.
We developed a Mock REST Server that mimics a flaky Keycloak instance. It introduces random delays and HTTP 500 errors, allowing us to observe how the migrator handles chaos.
Even under simulated heavy load with a 10% error rate, the migrator:
This simulation proves that the architecture holds up under pressure.
For detailed instructions on how to run this simulation yourself (including setting up the Mock Server and generating synthetic data), please refer to the Simulation Guide in the project's README.
Large-scale migrations are not solved by clever loops or bigger thread pools. They are solved by:
This migrator prioritizes safe, lossless, and restartable execution over raw speed. However, by design, those guarantees do not come at the cost of throughput. The same architecture has allowed us to migrate large-scale datasets into Keycloak efficiently and predictably, as shown in the Proven in Production section below.
At scale, that balance is what really matters.
This architecture isn't just theory. It was the engine behind a massive migration effort where we successfully moved over 20 million identities to Keycloak with zero data loss.
📖 Read the full case study: How Keymate Migrated 20+ Million Identities to Keycloak
This is Part 2 of the Migration at Scale series:
Planning a large-scale IAM migration? Learn how Keymate helps teams migrate safely without downtime.
Stay updated with our latest insights and product updates