Event Bus Integration
Goal
Build a consumer application to process platform events from Keymate Event Hub. This guide covers subscribing to event bus topics, deserializing Protobuf event payloads, and implementing reliable message processing with idempotency and retry handling.
Audience
- Developers building event-driven integrations with Keymate
- Platform engineers setting up downstream consumers for platform events
Prerequisites
- Access to the event bus cluster hosting Event Hub topics
- Connection credentials and Schema Registry URL provided by your integration team
- Event bus client library with Protobuf support for your programming language
- Event Hub proto artifact dependency for deserializing
EventHubEventmessages
Before You Start
Keymate Event Hub publishes platform events using the Transactional Outbox pattern. This guarantees that events are delivered at least once — your consumer must handle potential duplicates through idempotent processing.
Events are serialized as Protocol Buffers (Protobuf) and validated against schemas in a Schema Registry. Your consumer must use a Protobuf deserializer configured with the Schema Registry URL and the Event Hub proto artifact.
Event Hub Topics
Event Hub publishes events to dedicated event bus topics by event category. Each EventHubType maps to a dedicated topic:
| Event Category | Description |
|---|---|
| User Contact Changes | Updates to user contact information |
| Employee Lifecycle | Employee onboarding, offboarding, and status changes |
| Organization Registry Changes | Changes from external organization registries |
| Notary Changes | Updates from notary systems |
Your integration team will provide the specific topic names and access credentials during onboarding.
Event Payload Structure
All Event Hub events use the EventHubEvent Protobuf envelope:
| Field | Type | Description |
|---|---|---|
eventId | String (UUID) | Unique identifier for the event — use for idempotency |
sourceService | String | The Keymate service that generated the event |
eventType | EventHubType | Category of the event (maps to the topic) |
oneof event | Sub-event | The type-specific event payload |
Event Hub validates all events before publishing. Consumers can rely on these fields being present and valid.
Worked Example
In this guide, you will build a consumer that subscribes to an Event Hub topic, deserializes Protobuf messages, and processes events with idempotency and retry handling.
Steps
1. Configure the Consumer
Set up connection properties using credentials provided during onboarding.
# Connection (values provided during onboarding)
bootstrap.servers=<BOOTSTRAP_SERVERS>
group.id=<CONSUMER_GROUP>
# Protobuf deserialization with Schema Registry
key.deserializer=<STRING_DESERIALIZER>
value.deserializer=<PROTOBUF_DESERIALIZER>
schema.registry.url=<SCHEMA_REGISTRY_URL>
specific.protobuf.value.type=<EVENTHUB_EVENT_CLASS>
# Consumer behavior
auto.offset.reset=earliest
enable.auto.commit=false
Do not use auto.commit=true with Event Hub consumers. Manual offset commits after successful processing ensure at-least-once delivery guarantees are preserved.
2. Subscribe and Poll
Connect to your assigned Event Hub topic and process incoming events.
public class EventHubConsumer {
public static void main(String[] args) {
Properties props = loadProperties();
try (var consumer = createConsumer(props)) {
consumer.subscribe(List.of("<EVENT_HUB_TOPIC>"));
while (true) {
var records = consumer.poll(Duration.ofMillis(1000));
for (var record : records) {
processEvent(record.value());
}
consumer.commitSync();
}
}
}
private static void processEvent(EventHubEvent event) {
System.out.printf("EventId: %s, Type: %s, Source: %s%n",
event.getEventId(),
event.getEventType(),
event.getSourceService());
// Handle specific event types via the oneof case
switch (event.getEventCase()) {
case <EMPLOYEE_LIFECYCLE_CASE>:
handleEmployeeLifecycle(event);
break;
case <USER_CONTACT_CASE>:
handleUserContactChange(event);
break;
// Handle other event types...
default:
// Handle unknown event case gracefully
break;
}
}
private static Properties loadProperties() {
// Load from environment or config file
return new Properties();
}
}
3. Implement Idempotent Processing
Event Hub provides at-least-once delivery. Use the eventId field to track processed events and prevent duplicate handling.
public class IdempotentHandler {
private final Set<String> processed = ConcurrentHashMap.newKeySet();
public void handleOnce(String eventId, Runnable action) {
if (processed.add(eventId)) {
try {
action.run();
} catch (RuntimeException e) {
processed.remove(eventId);
throw e;
}
}
}
}
Use a persistent store (relational database or distributed cache) for tracking processed event IDs in production environments. The in-memory set shown above is suitable for development only.
4. Handle Failures
Implement retry logic with backoff and dead-letter handling for failed messages.
public class RetryHandler {
private static final int MAX_RETRIES = 3;
private static final long BACKOFF_MS = 100;
public void processWithRetry(EventHubEvent event) {
for (int attempt = 1; attempt <= MAX_RETRIES; attempt++) {
try {
processEvent(event);
return;
} catch (Exception e) {
if (attempt == MAX_RETRIES) {
handleFailure(event, e);
} else {
sleep(BACKOFF_MS * attempt);
}
}
}
}
private void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void handleFailure(EventHubEvent event, Exception error) {
// Log the failure with eventId for troubleshooting
// Send to dead-letter topic or alert operations team
}
}
The example above uses linear backoff. For production workloads, consider exponential backoff with jitter to avoid thundering herd problems.
Configuration Reference
| Property | Description | Recommended |
|---|---|---|
group.id | Consumer group identifier | Unique per integration |
auto.offset.reset | Start position for new groups | earliest |
enable.auto.commit | Automatic offset commits | false |
max.poll.records | Records per poll | 100-500 |
session.timeout.ms | Session timeout | 30000 |
schema.registry.url | Schema Registry URL | Required for Protobuf |
Validation Scenario
Scenario
Subscribe to an Event Hub topic and verify platform events are received and processed correctly.
Expected Result
Consumer logs show received events with valid eventId, eventType, and sourceService fields. Event payloads deserialize successfully based on the oneof event case.
How to Verify
- Logs: Events printed with eventId, type, and source service
- Offsets: Consumer group offset advances after processing
- Idempotency: Reprocessing the same eventId does not duplicate side effects
- Schema: Events deserialize without Protobuf errors
Troubleshooting
-
No messages received — Verify topic name and consumer group permissions. Check
auto.offset.resetif the group is new. -
Deserialization errors — Confirm Schema Registry URL is correct and accessible. Verify the proto artifact version matches Event Hub's deployment.
-
Schema compatibility errors — Your consumer's proto artifact may be outdated. Update to the latest version that matches Event Hub.
-
Consumer lag — Scale consumer instances or increase
max.poll.records. Check for slow processing in event handlers. -
Duplicate processing — Implement idempotent handling using
eventId. Verify your processed-events store is working correctly.
Next Steps
After verifying basic consumption:
- Implement business logic handlers for each event type your integration requires
- Add monitoring for consumer health, lag, and processing errors
- Configure alerting for dead-letter queue accumulation
- Review Consumer Contracts for detailed event schemas and validation rules