Skip to main content

Event Bus Integration

Goal

Build a consumer application to process platform events from Keymate Event Hub. This guide covers subscribing to event bus topics, deserializing Protobuf event payloads, and implementing reliable message processing with idempotency and retry handling.

Audience

  • Developers building event-driven integrations with Keymate
  • Platform engineers setting up downstream consumers for platform events

Prerequisites

  • Access to the event bus cluster hosting Event Hub topics
  • Connection credentials and Schema Registry URL provided by your integration team
  • Event bus client library with Protobuf support for your programming language
  • Event Hub proto artifact dependency for deserializing EventHubEvent messages

Before You Start

Keymate Event Hub publishes platform events using the Transactional Outbox pattern. This guarantees that events are delivered at least once — your consumer must handle potential duplicates through idempotent processing.

Events are serialized as Protocol Buffers (Protobuf) and validated against schemas in a Schema Registry. Your consumer must use a Protobuf deserializer configured with the Schema Registry URL and the Event Hub proto artifact.

Event Hub Topics

Event Hub publishes events to dedicated event bus topics by event category. Each EventHubType maps to a dedicated topic:

Event CategoryDescription
User Contact ChangesUpdates to user contact information
Employee LifecycleEmployee onboarding, offboarding, and status changes
Organization Registry ChangesChanges from external organization registries
Notary ChangesUpdates from notary systems

Your integration team will provide the specific topic names and access credentials during onboarding.

Event Payload Structure

All Event Hub events use the EventHubEvent Protobuf envelope:

FieldTypeDescription
eventIdString (UUID)Unique identifier for the event — use for idempotency
sourceServiceStringThe Keymate service that generated the event
eventTypeEventHubTypeCategory of the event (maps to the topic)
oneof eventSub-eventThe type-specific event payload

Event Hub validates all events before publishing. Consumers can rely on these fields being present and valid.

Worked Example

In this guide, you will build a consumer that subscribes to an Event Hub topic, deserializes Protobuf messages, and processes events with idempotency and retry handling.

Steps

1. Configure the Consumer

Set up connection properties using credentials provided during onboarding.

consumer.properties
# Connection (values provided during onboarding)
bootstrap.servers=<BOOTSTRAP_SERVERS>
group.id=<CONSUMER_GROUP>

# Protobuf deserialization with Schema Registry
key.deserializer=<STRING_DESERIALIZER>
value.deserializer=<PROTOBUF_DESERIALIZER>
schema.registry.url=<SCHEMA_REGISTRY_URL>
specific.protobuf.value.type=<EVENTHUB_EVENT_CLASS>

# Consumer behavior
auto.offset.reset=earliest
enable.auto.commit=false
warning

Do not use auto.commit=true with Event Hub consumers. Manual offset commits after successful processing ensure at-least-once delivery guarantees are preserved.

2. Subscribe and Poll

Connect to your assigned Event Hub topic and process incoming events.

EventHubConsumer.java
public class EventHubConsumer {

public static void main(String[] args) {
Properties props = loadProperties();

try (var consumer = createConsumer(props)) {

consumer.subscribe(List.of("<EVENT_HUB_TOPIC>"));

while (true) {
var records = consumer.poll(Duration.ofMillis(1000));

for (var record : records) {
processEvent(record.value());
}

consumer.commitSync();
}
}
}

private static void processEvent(EventHubEvent event) {
System.out.printf("EventId: %s, Type: %s, Source: %s%n",
event.getEventId(),
event.getEventType(),
event.getSourceService());

// Handle specific event types via the oneof case
switch (event.getEventCase()) {
case <EMPLOYEE_LIFECYCLE_CASE>:
handleEmployeeLifecycle(event);
break;
case <USER_CONTACT_CASE>:
handleUserContactChange(event);
break;
// Handle other event types...
default:
// Handle unknown event case gracefully
break;
}
}

private static Properties loadProperties() {
// Load from environment or config file
return new Properties();
}
}

3. Implement Idempotent Processing

Event Hub provides at-least-once delivery. Use the eventId field to track processed events and prevent duplicate handling.

IdempotentHandler.java
public class IdempotentHandler {

private final Set<String> processed = ConcurrentHashMap.newKeySet();

public void handleOnce(String eventId, Runnable action) {
if (processed.add(eventId)) {
try {
action.run();
} catch (RuntimeException e) {
processed.remove(eventId);
throw e;
}
}
}
}
tip

Use a persistent store (relational database or distributed cache) for tracking processed event IDs in production environments. The in-memory set shown above is suitable for development only.

4. Handle Failures

Implement retry logic with backoff and dead-letter handling for failed messages.

RetryHandler.java
public class RetryHandler {

private static final int MAX_RETRIES = 3;
private static final long BACKOFF_MS = 100;

public void processWithRetry(EventHubEvent event) {
for (int attempt = 1; attempt <= MAX_RETRIES; attempt++) {
try {
processEvent(event);
return;
} catch (Exception e) {
if (attempt == MAX_RETRIES) {
handleFailure(event, e);
} else {
sleep(BACKOFF_MS * attempt);
}
}
}
}

private void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

private void handleFailure(EventHubEvent event, Exception error) {
// Log the failure with eventId for troubleshooting
// Send to dead-letter topic or alert operations team
}
}
note

The example above uses linear backoff. For production workloads, consider exponential backoff with jitter to avoid thundering herd problems.

Configuration Reference

PropertyDescriptionRecommended
group.idConsumer group identifierUnique per integration
auto.offset.resetStart position for new groupsearliest
enable.auto.commitAutomatic offset commitsfalse
max.poll.recordsRecords per poll100-500
session.timeout.msSession timeout30000
schema.registry.urlSchema Registry URLRequired for Protobuf

Validation Scenario

Scenario

Subscribe to an Event Hub topic and verify platform events are received and processed correctly.

Expected Result

Consumer logs show received events with valid eventId, eventType, and sourceService fields. Event payloads deserialize successfully based on the oneof event case.

How to Verify

  • Logs: Events printed with eventId, type, and source service
  • Offsets: Consumer group offset advances after processing
  • Idempotency: Reprocessing the same eventId does not duplicate side effects
  • Schema: Events deserialize without Protobuf errors

Troubleshooting

  • No messages received — Verify topic name and consumer group permissions. Check auto.offset.reset if the group is new.

  • Deserialization errors — Confirm Schema Registry URL is correct and accessible. Verify the proto artifact version matches Event Hub's deployment.

  • Schema compatibility errors — Your consumer's proto artifact may be outdated. Update to the latest version that matches Event Hub.

  • Consumer lag — Scale consumer instances or increase max.poll.records. Check for slow processing in event handlers.

  • Duplicate processing — Implement idempotent handling using eventId. Verify your processed-events store is working correctly.

Next Steps

After verifying basic consumption:

  1. Implement business logic handlers for each event type your integration requires
  2. Add monitoring for consumer health, lag, and processing errors
  3. Configure alerting for dead-letter queue accumulation
  4. Review Consumer Contracts for detailed event schemas and validation rules