Keymate Logo
← Back to Blog

Keymate's Guide to Reactive Data Migration

Keymate Team
January 2026
Keymate's Guide to Reactive Data Migration

Keymate's Guide to Reactive Data Migration

Designing a Restartable, Lossless, High-Throughput Migration Pipeline

In large-scale data migration processes, controlling system pressure, preventing data loss, and maintaining operational sustainability while millions of records are in transit are the key factors that make the difference.

This article describes a real-world migration problem and the architecture of a solution designed by the Keymate team to move tens of millions of records from an external database into Keycloak, utilizing a database-backed work queue, bounded concurrency, durable retry semantics, and reactive execution.

The solution is implemented as an open-source migrator application, published at: Keymate Migrator on GitHub.


The Problem We Were Solving

At a high level, the problem sounds deceptively simple:

Read a large number of records from an external database and send them to a target system as fast as possible.

In reality, the constraints make it significantly harder.

Core Requirements

  • Very Large Data Volume: Millions (and potentially tens of millions) of records.
  • High Concurrency: The target system (e.g., a Keycloak import endpoint) must be driven at a steady, controlled load to fully utilize its capacity and complete the migration within a reasonable time window.
  • Zero Data Loss: No silent drops, even in the presence of crashes or restarts.
  • Controlled Retries: Failures must be retried but without retry storms or uncontrolled load amplification.
  • Restartability: The process must safely resume after restarts without manual intervention.

Why "Just Parallelize It" Fails

A common first attempt looks like this:

  1. Read rows from the source database.
  2. Submit them to a thread pool.
  3. Retry on failure.
  4. Increase thread count when it's slow.

This approach usually fails in predictable ways:

  • Unlimited parallelism overwhelms databases and APIs.
  • Immediate retries amplify transient failures.
  • In-memory retry buffers lose data on crashes.
  • Progress tracking becomes unreliable.
  • Restarting the process becomes risky.

The fundamental mistake is assuming that more threads equal more throughput.

In this context, bounded concurrency is not an optimization technique, but a safety mechanism that keeps throughput stable while protecting downstream systems. Throughput is determined by the noisiest bottleneck, not by the number of workers.

Introducing a Database-Backed Work Queue

Instead of pushing data directly from the source into the target, we introduce an explicit work queue.

The work_queue Table

All migration tasks first materialized into a table. This pattern closely matches the Transactional Outbox / Work Queue pattern, with the database acting as the source of truth for progress and retry state.

  • One row = one migration job
  • Each row represents a unit of work
  • State, retries, and scheduling metadata are persisted

As described in the Configuration Power section, we use app.claimers to define how many concurrent threads compete to fetch work from the database. Without proper handling, this would create a significant bottleneck where threads block each other while waiting to release row-level locks.

To solve this, we leverage a powerful PostgreSQL feature: the FOR UPDATE SKIP LOCKED clause. This allows our concurrent threads to operate at high velocity without needing a complex distributed locking mechanism.

WITH picked AS (
    SELECT id FROM work_queue
        WHERE next_retry_at <= now()
        ORDER BY id
        FOR UPDATE SKIP LOCKED LIMIT $1
)

DELETE FROM work_queue w
    USING picked p
    WHERE w.id = p.id
RETURNING w.id, w.source_table, w.source_pk, w.payload, w.attempt_count;

By using this pattern, we achieve several critical operational benefits:

  • Non-Blocking Claims: Unlike a standard FOR UPDATE (which makes other threads wait for the lock), SKIP LOCKED tells the database: If another thread is already working on this row, ignore it and give me the next available one.
  • Thread Independence: Even with a high number of app.claimers (e.g., 128 or 256), each thread moves instantly to the next free batch, eliminating wait states and maximizing CPU utilization.
  • Atomic State Transitions: The transition from READY to PROCESSING happens in a single atomic database operation. This guarantees that no two threads—whether in the same instance or across multiple containers—ever process the same record.
  • Native Queue Management: This effectively transforms a standard relational table into a high-performance, ACID-compliant work queue without the overhead of external tools like Redis.

Database-backed work queue architecture

Ensuring Exactly-Once Processing

To guarantee that we don't process the same record twice (idempotency) or lose track of what's been done, we use a processed_log table alongside the work queue. The following SQL snippet demonstrates how we enqueue work only for items that haven't been successfully processed yet:

-- Enqueue orders that haven't been processed successfully
INSERT INTO work_queue (source_table, source_pk, payload)
SELECT 'orders', o.id, jsonb_build_object('id', o.id, 'data', o.data)
FROM orders o
WHERE NOT EXISTS (SELECT 1
                  FROM processed_log p
                  WHERE p.source_table = 'orders'
                    AND p.source_pk = o.id
                    AND p.status = 'OK')
    ON CONFLICT (source_table, source_pk) DO NOTHING;

This simple mechanism ensures that if the migration is interrupted and restarted, it picks up exactly where it left off, without duplicating work.

Processing Loop: Controlled Parallelism

The migrator runs a continuous processing loop:

  1. Claim a batch of ready jobs from work_queue.
  2. Split the batch into bounded chunks.
  3. Process each chunk in parallel.
  4. Persist success or failure.
  5. Repeat until the queue is empty.

Why Bounded Concurrency Matters?

Instead of letting concurrency grow unbounded, the system enforces strict limits:

  • Maximum number of in-flight jobs.
  • Maximum number of database connections.
  • Maximum pressure on the target system.

This ensures stable throughput, predictable resource usage, and prevents retry storms. Concurrency is treated as a tuning parameter, not an optimization shortcut.

Configuration Power

We control this behavior through simple configuration properties, allowing us to tune the migrator from a gentle trickle to a high-throughput stream without changing code:

# Controls how many jobs are processed in parallel
app.concurrency=64

# Controls how many threads compete to claim work from the DB
app.claimers=32

# Defines the maximum size of the reactive DB connection pool
quarkus.datasource.reactive.max-size=100

Reactive Execution Model

The migrator is built using Quarkus, Mutiny, and a Reactive PostgreSQL client.

The reactive model provides non-blocking I/O and efficient use of event loops. Importantly, reactive does not mean uncontrolled parallelism. All reactive streams are still explicitly bounded.

Reactive processing pipeline

Reactive Pipeline Example

The core processing logic is a reactive pipeline that flows from claiming a job to processing it and handling the result. Here is a streamlined version of the production loop that demonstrates the bounded concurrency pattern:

private Uni<Void> runBatch() {
  return repo.claimBatch(fetchSize)
    .onItem().transformToUni(list -> {
      if (list.isEmpty()) return Uni.createFrom().voidItem().onItem().delayIt().by(idleMs);

      // 1) Split jobs into chunks of size 'concurrency'
      return Multi.createFrom().iterable(list)
        .group().intoLists().of(concurrency) // derived from app.concurrency (see Configuration Power)

        // 2) Process chunks sequentially
        .onItem().transformToUniAndConcatenate(chunk ->

          // 3) Process jobs within the chunk in parallel
          Multi.createFrom().iterable(chunk)
          .onItem().transformToUniAndMerge(job ->

            // The core business logic
            router.process(job)
            .call(() -> repo.markOk(job))

            // Robust failure handling (DB requeue -> Disk Outbox)
            .onFailure().call(err -> repo.requeue(job, err)
              .onFailure().call(e -> outboxBuffer.offer(job, err))
            )
            // Resilience: Don't break the loop on individual errors
            .onFailure().recoverWithUni(Uni.createFrom().voidItem())
          )
          .collect().asList()
          .replaceWithVoid()
        )
        .collect().asList()
        .replaceWithVoid();
    });
}

In this snippet, transformToUniAndMerge is the hero. It subscribes to the inner streams concurrently, but crucially, that concurrency is not implicit or unbounded.

The upper limit is defined earlier in the Configuration Power section via app.concurrency. That same value is used as the chunk size in the reactive pipeline, making it the single source of truth for how many requests may be in flight at once.

This guarantees that the system will never exceed app.concurrency concurrent operations, regardless of how fast the database can supply new jobs.

This structure allows us to handle thousands of concurrent operations with a very small number of OS threads, maximizing efficiency.

With the arrival of Java 21, Virtual Threads (Project Loom) are often seen as the default replacement for reactive programming. However, for this specific use case, we deliberately chose the Reactive model (Mutiny) over Virtual Threads.

The reasoning is threefold:

  1. First-Class Backpressure: A migration is a flow control problem. We need to slow down reading from the database if the target API slows down. Reactive Streams provide standard, non-blocking backpressure mechanisms that Virtual Threads do not offer out of the box.
  2. Rich Composition: Operators like .retry().withBackoff(), .onFailure().recoverWith(), and .collect().asList() allow us to express complex error handling and batching logic declaratively.
  3. Explicit Resource Bounding: While Virtual Threads are cheap, the resources they access (DB connections, file handles) are not. Spawning 10,000 virtual threads is easy, but opening 10,000 database connections is disastrous. The reactive model forces us to be explicit about concurrency limits, preventing the illusion of infinite capacity that can lead to resource exhaustion.

The Failure Lifecycle: Durable & Delayed

Failures are expected in large migrations: network hiccups, transient target errors, or temporary database contention. The key question is how failures are handled.

We treat failure handling as a lifecycle with two critical phases: Persistence and Scheduling.

Failure lifecycle with durable persistence and backoff scheduling

1. Durable Persistence (No Data Loss)

When a job fails, the error context is captured immediately. We employ a dual-layer safety net to ensure this failure is never lost:

  1. Primary: The failure is persisted to the Database (work_queue table) with an incremented attempt count.
  2. Fallback: If the database is unavailable (e.g., connection lost), the failure is written to a Disk-Backed Segmented Outbox.

This logic is implemented directly in the reactive pipeline:

// Inside the processing pipeline:
.onFailure().call(err ->
    // 1. Try to persist failure state to the Database
    repo.requeue(job, err)
        // 2. If DB fails (e.g., connection lost), fallback to Disk Outbox
        .onFailure().call(dbErr ->
            outboxBuffer.offer(job, err)
        )
)

2. Backoff-Aware Scheduling (No Retry Storms)

Persisting the failure is only half the battle. The next question is when to retry.

Failed jobs are not immediately retried. Instead:

  1. A next_retry_at timestamp is computed using an exponential backoff strategy.
  2. The job is reinserted into the work queue with this future timestamp.
  3. The processing loop naturally skips it until it becomes eligible.

This prevents retry storms, cascading failures, and target system overload. Retries become a controlled flow, not a panic reaction.

The Role of the Disk Outbox

The disk outbox acts as a temporary buffer during database outages. It is segmented for efficient append and drain operations. A background drainer process continuously monitors these files and restores the jobs to the database when it becomes available:

private Uni<Void> drainOnce() {
    // 1. Read a batch of failed jobs from the disk segment
    return ioOnWorker(this::readBatchUnsafe)
        .onItem().transformToUni(batch ->
            // 2. Process them in parallel (bounded by dbConcurrency)
            Multi.createFrom().iterable(batch.items())
                .group().intoLists().of(dbConcurrency)
                .onItem().transformToUniAndConcatenate(chunk ->
                    // 3. Write back to the main Work Queue in the DB
                    Multi.createFrom().iterable(chunk)
                        .onItem().transformToUniAndMerge(line ->
                             repo.requeueParams(line.req())
                        )
                        .collect().asList()
                )
                // 4. If successful, advance the file offset (commit)
                .invoke(() -> writeOffsetAtomic(batch.off(), newOffset))
        );
}

Observability and Operational Safety

Because all state lives in durable storage:

  • Progress can be queried: You can run SQL queries to see how many jobs are pending, processing, or completed.
  • Failures can be inspected: The last_error column in the processed_log table provides immediate insight into why records are failing.
  • System is auditable: The processed_log table keeps a history of every attempt.

Observability and operational safety

Operationally, it behaves like a pipeline, not a fire-and-forget script.

Simulation: Proving the Architecture

Theory is good, but practice is better. To validate this architecture, we built a simulation environment included in the project.

We developed a Mock REST Server that mimics a flaky Keycloak instance. It introduces random delays and HTTP 500 errors, allowing us to observe how the migrator handles chaos.

Even under simulated heavy load with a 10% error rate, the migrator:

  • Successfully processed all records.
  • Retried failed records with exponential backoff.
  • Never exceeded the configured concurrency limits.
  • Lost zero data.

This simulation proves that the architecture holds up under pressure.

For detailed instructions on how to run this simulation yourself (including setting up the Mock Server and generating synthetic data), please refer to the Simulation Guide in the project's README.

Closing Thoughts

Large-scale migrations are not solved by clever loops or bigger thread pools. They are solved by:

  • Explicit State
  • Durable Boundaries
  • Controlled Concurrency
  • Respect for Downstream Systems

This migrator prioritizes safe, lossless, and restartable execution over raw speed. However, by design, those guarantees do not come at the cost of throughput. The same architecture has allowed us to migrate large-scale datasets into Keycloak efficiently and predictably, as shown in the Proven in Production section below.

At scale, that balance is what really matters.

Proven in Production

This architecture isn't just theory. It was the engine behind a massive migration effort where we successfully moved over 20 million identities to Keycloak with zero data loss.

📖 Read the full case study: How Keymate Migrated 20+ Million Identities to Keycloak

This is Part 2 of the Migration at Scale series:

Talk to the Keymate Team

Planning a large-scale IAM migration? Learn how Keymate helps teams migrate safely without downtime.

Stay updated with our latest insights and product updates