integration-architect
**Master Skill**: Integration & Event Systems Architect. Covers Distributed Transactions (Sagas), Event Sourcing, Kafka/AMQ Streams engineering, CDC (Debezium), CloudEvents, Schema Registry, and Exactly-Once semantics.
SKILL.md
| Name | integration-architect |
| Description | **Master Skill**: Integration & Event Systems Architect. Covers Distributed Transactions (Sagas), Event Sourcing, Kafka/AMQ Streams engineering, CDC (Debezium), CloudEvents, Schema Registry, and Exactly-Once semantics. |
name: integration-architect version: 2.0.0 maturity: stable updated: 2026-01-30 author: payu-platform-team requires: [core-banking-engineer] tags: [events, kafka, integration, cdc, saga, event-sourcing, debezium] related: [core-banking-engineer, data-architect, platform-engineer] description: Master Skill: Integration & Event Systems Architect. Covers Distributed Transactions (Sagas), Event Sourcing, Kafka/AMQ Streams engineering, CDC (Debezium), CloudEvents, Schema Registry, and Exactly-Once semantics.
๐ Reference Implementation Patterns
For detailed patterns and historical context on PayU integration, see:
PayU Integration Architect Master Skill
You are the Lead Events & Messaging Architect (AI) for the PayU Platform. You design the nervous system of the bank, ensuring ultra-reliable, high-throughput asynchronous communication between microservices using AMQ Streams (Kafka).
โก 2026 Integration Trends & Standards
- Durable Execution (Temporal): Moving beyond pure Kafka Sagas to stateful workflows for high-stakes business logic.
- KRaft Mode: All Kafka clusters in 2026 are Zookeeper-free.
- CloudEvents 1.0.2: Strict adherence for cross-cloud and cross-language compatibility.
- Exactly-Once Semantics (EOS): Default for all financial ledger operations.
๐ฌ AMQ Streams & Kafka Engineering
1. Producer Configuration (Financial Grade)
// KafkaProducerConfig.java
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
// Bootstrap
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrap);
// โ
CRITICAL: Exactly-Once Semantics for financial data
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "payu-wallet-txn");
config.put(ProducerConfig.ACKS_CONFIG, "all"); // Wait for all replicas
// Reliability
config.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
config.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);
// Serialization
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
2. Consumer Configuration (Read Committed)
// KafkaConsumerConfig.java
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrap);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "wallet-service-group");
// โ
CRITICAL: Only read committed transactions
config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
// Offset management
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Performance tuning
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);
config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// Manual ack for reliability
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
// Concurrency based on partitions
factory.setConcurrency(3);
return factory;
}
}
3. Topic Naming Convention
payu.<domain>.<event-type>.<version>
Examples:
- payu.wallet.transfer-initiated.v1
- payu.wallet.transfer-completed.v1
- payu.wallet.transfer-failed.v1
- payu.transaction.payment-received.v1
- payu.kyc.verification-completed.v1
DLQ Topics:
- payu.wallet.transfer-initiated.v1.dlq
4. Topic Configuration & Deep-Dive Internals
Konfigurasi topik ini dirancang untuk throughput tinggi dengan jaminan data durability nol-kompromi.
# Strimzi KafkaTopic CR
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: payu.wallet.transfer-initiated.v1
labels:
strimzi.io/cluster: payu-kafka
spec:
partitions: 12 # Formula: Max(Consumer Group Parallelism) * Buffer
replicas: 3 # Standar HA (survive 1 node failure)
config:
# DURABILITY
min.insync.replicas: 2 # Wajib! Producer akan gagal jika hanya 1 replika yang aktif.
unclean.leader.election.enable: "false" # Jangan pernah promote replica yang lag.
# RETENTION
retention.ms: 604800000 # 7 hari
retention.bytes: -1 # Unlimited size (storage bound)
# PERFORMANCE & BATCHING
segment.bytes: 1073741824 # 1GB log segments
max.message.bytes: 1048576 # 1MB cap (cegah payload bloating)
compression.type: lz4 # Best balance CPU vs Size
# CLEANUP (Compact untuk state, Delete untuk events)
cleanup.policy: delete
๐ง Partitioning Strategy Calculator
Jangan menebak jumlah partisi. Gunakan rumus ini:
$$ Partitions = Max(T_p, T_c) $$
- $T_p$: Target Throughput Producer (MB/s)
- $T_c$: Target Throughput Consumer (MB/s)
Skenario PayU:
- Target: 50,000 TPS (Transactions Per Second)
- Avg Msg Size: 1KB
- Throughput: 50 MB/s
- Single Consumer speed: 10 MB/s (heavy processing)
- Result: Butuh minimal 5 consumer paralel -> set 6 atau 12 partisi (untuk scaling room).
โฃ๏ธ Poison Pill Handling Strategy
Pesan rusak (malformed JSON) bisa memacetkan consumer selamanya.
Implementasi ErrorHandlingDeserializer:
// Spring Boot Properties
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
// Dead Letter Publishing (Dlt)
@RetryableTopic(
attempts = "3",
backoff = @Backoff(delay = 1000, multiplier = 2.0),
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE
)
@KafkaListener(topics = "payu.wallet.transfer")
public void listen(TransferEvent event) {
// Process
}
๐ CloudEvents Standard
Event Envelope Structure
{
"specversion": "1.0",
"id": "evt-550e8400-e29b-41d4-a716-446655440000",
"source": "payu://wallet-service/wallets/wallet-123",
"type": "com.payu.wallet.transfer.completed.v1",
"datacontenttype": "application/json",
"time": "2026-01-30T10:45:00.000Z",
"subject": "wallet-123",
"data": {
"transferId": "txn-456",
"fromWalletId": "wallet-123",
"toWalletId": "wallet-789",
"amount": {
"value": 500000,
"currency": "IDR"
},
"status": "COMPLETED",
"completedAt": "2026-01-30T10:45:00.000Z"
},
"payutracecontext": "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01",
"payucorrelationid": "corr-abc-123"
}
Java CloudEvents Implementation
// CloudEvent builder
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
public CloudEvent createTransferEvent(TransferCompleted transfer) {
return CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSource(URI.create("payu://wallet-service/wallets/" + transfer.getFromWalletId()))
.withType("com.payu.wallet.transfer.completed.v1")
.withDataContentType("application/json")
.withTime(OffsetDateTime.now())
.withSubject(transfer.getFromWalletId())
.withData(objectMapper.writeValueAsBytes(transfer))
.withExtension("payutracecontext", traceContext)
.withExtension("payucorrelationid", correlationId)
.build();
}
๐ Schema Registry (Apicurio)
Avro Schema Definition
{
"type": "record",
"name": "TransferCompleted",
"namespace": "com.payu.wallet.events",
"doc": "Event emitted when a wallet transfer completes successfully",
"fields": [
{"name": "transferId", "type": "string"},
{"name": "fromWalletId", "type": "string"},
{"name": "toWalletId", "type": "string"},
{
"name": "amount",
"type": {
"type": "record",
"name": "Money",
"fields": [
{"name": "value", "type": "long", "doc": "Amount in smallest unit (cents)"},
{"name": "currency", "type": "string", "default": "IDR"}
]
}
},
{"name": "status", "type": {"type": "enum", "name": "TransferStatus", "symbols": ["COMPLETED", "REVERSED"]}},
{"name": "completedAt", "type": {"type": "long", "logicalType": "timestamp-millis"}}
]
}
Schema Compatibility Rules
# apicurio-registry-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: apicurio-registry-config
data:
# Schema compatibility mode
registry.rules.validity: FULL
registry.rules.compatibility: BACKWARD_TRANSITIVE
# Schema groups
# payu-events: BACKWARD (consumers can read old + new)
# payu-commands: FORWARD (producers can write to old + new)
๐ Transactional Outbox Pattern
1. Outbox Table Schema
CREATE TABLE outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(100) NOT NULL,
aggregate_id VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ,
-- For ordering
sequence_num BIGSERIAL
);
-- Index for Debezium polling
CREATE INDEX idx_outbox_unpublished
ON outbox_events(created_at)
WHERE published_at IS NULL;
2. Transactional Write Pattern
@Service
@Transactional
public class WalletService {
@Autowired private WalletRepository walletRepository;
@Autowired private OutboxRepository outboxRepository;
public void transfer(TransferRequest request) {
// 1. Update wallet balance
Wallet fromWallet = walletRepository.findById(request.getFromWalletId())
.orElseThrow(() -> new WalletNotFoundException());
fromWallet.debit(request.getAmount());
walletRepository.save(fromWallet);
// 2. Write to outbox (same transaction!)
OutboxEvent event = OutboxEvent.builder()
.aggregateType("Wallet")
.aggregateId(request.getFromWalletId())
.eventType("TransferInitiated")
.payload(objectMapper.writeValueAsString(request))
.build();
outboxRepository.save(event);
// โ
Both succeed or both fail (ACID)
}
}
3. Debezium Outbox Connector
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "wallet-db",
"database.port": "5432",
"database.user": "debezium",
"database.password": "${env:DB_PASSWORD}",
"database.dbname": "wallet",
"plugin.name": "pgoutput",
"slot.name": "outbox_slot",
"table.include.list": "public.outbox_events",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.topic.replacement": "payu.${routedByValue}.events"
}
}
๐ Saga Orchestration
1. Transfer Saga State Machine
public enum TransferSagaState {
INITIATED,
SOURCE_DEBITED,
DESTINATION_CREDITED,
COMPLETED,
// Compensation states
DEBIT_COMPENSATION_PENDING,
CREDIT_COMPENSATION_PENDING,
COMPENSATED,
FAILED
}
@Entity
@Table(name = "transfer_sagas")
public class TransferSaga {
@Id
private String sagaId;
@Enumerated(EnumType.STRING)
private TransferSagaState state;
private String fromWalletId;
private String toWalletId;
private BigDecimal amount;
private String currency;
// Audit
@Version
private Long version;
private Instant createdAt;
private Instant updatedAt;
// Compensation data
private String debitTransactionId;
private String creditTransactionId;
}
2. Saga Orchestrator
@Service
public class TransferSagaOrchestrator {
@Transactional
public void handleDebitCompleted(DebitCompletedEvent event) {
TransferSaga saga = sagaRepository.findById(event.getSagaId())
.orElseThrow(() -> new SagaNotFoundException());
if (saga.getState() != TransferSagaState.INITIATED) {
log.warn("Invalid state transition for saga {}", saga.getSagaId());
return;
}
// Store compensation data
saga.setDebitTransactionId(event.getTransactionId());
saga.setState(TransferSagaState.SOURCE_DEBITED);
sagaRepository.save(saga);
// Proceed to credit destination
CreditCommand command = CreditCommand.builder()
.sagaId(saga.getSagaId())
.walletId(saga.getToWalletId())
.amount(saga.getAmount())
.build();
kafkaTemplate.send("payu.wallet.commands.v1", command);
}
@Transactional
public void handleCreditFailed(CreditFailedEvent event) {
TransferSaga saga = sagaRepository.findById(event.getSagaId())
.orElseThrow();
// Start compensation
saga.setState(TransferSagaState.DEBIT_COMPENSATION_PENDING);
sagaRepository.save(saga);
// Compensate: reverse the debit
ReversalCommand reversal = ReversalCommand.builder()
.sagaId(saga.getSagaId())
.originalTransactionId(saga.getDebitTransactionId())
.reason("Credit failed: " + event.getReason())
.build();
kafkaTemplate.send("payu.wallet.reversals.v1", reversal);
}
}
3. Compensation Table
| Step | Action | Compensation |
|---|---|---|
| 1 | Debit Source Wallet | Credit Source Wallet (Reversal) |
| 2 | Credit Destination Wallet | Debit Destination Wallet (Reversal) |
4. Durable Execution (Temporal)
For complex, long-running sagas (e.g., Cross-border transfers taking 2 days), PayU uses Temporal. It provides automatic retries, state persistence, and "infinite" timeouts.
// TransferWorkflowImpl.java
public void executeTransfer(TransferRequest request) {
try {
walletActivity.debit(request.from(), request.amount());
bankActivity.sendWire(request.to(), request.amount());
notificationActivity.sendSuccess(request.user());
} catch (Exception e) {
// Automatic compensation if any activity fails
walletActivity.credit(request.from(), request.amount());
throw e;
}
}
๐๏ธ CDC with Debezium
Full CDC Configuration
{
"name": "wallet-cdc-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "wallet-db",
"database.port": "5432",
"database.user": "debezium",
"database.password": "${env:DB_PASSWORD}",
"database.dbname": "wallet",
"database.server.name": "payu-wallet",
"plugin.name": "pgoutput",
"slot.name": "wallet_cdc_slot",
"publication.name": "wallet_publication",
"table.include.list": "public.wallets,public.ledger_entries",
"column.exclude.list": "public.wallets.encrypted_data",
"transforms": "route,unwrap",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "payu-wallet.public.(.*)",
"transforms.route.replacement": "payu.wallet.cdc.$1",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields": "op,source.ts_ms",
"transforms.unwrap.delete.handling.mode": "rewrite",
"heartbeat.interval.ms": 10000,
"snapshot.mode": "initial"
}
}
๐ Consumer Lag Monitoring
Prometheus Metrics
# prometheus/kafka-alerts.yaml
groups:
- name: kafka-consumer-alerts
rules:
- alert: KafkaConsumerLagHigh
expr: kafka_consumer_group_lag > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "Kafka consumer lag is high"
description: "Consumer group {{ $labels.group }} has lag {{ $value }} on topic {{ $labels.topic }}"
- alert: KafkaConsumerLagCritical
expr: kafka_consumer_group_lag > 100000
for: 2m
labels:
severity: critical
annotations:
summary: "Kafka consumer lag is critical"
Grafana Dashboard Query
# Consumer lag by group
sum by (group, topic) (kafka_consumergroup_lag)
# Messages per second
rate(kafka_topic_partition_current_offset[5m])
# Consumer throughput
rate(kafka_consumer_fetch_manager_records_consumed_total[5m])
๐ Event Systems Checklist
Producer
-
acks=allconfigured for financial data - Idempotent producer enabled
- Transactional ID set for EOS
- Retry and timeout configured
Consumer
-
isolation.level=read_committedconfigured - Manual acknowledgment implemented
- DLQ configured for poison pills
- Consumer lag monitored
Events
- CloudEvents spec followed
- Schema registered in Apicurio
- Backward compatibility verified
- Message size optimized
Sagas
- Every step has compensation defined
- Saga state persisted in database
- Timeout handling implemented
- Idempotency keys used
๐ References
Local Reference Files
| Category | Topic | File |
|---|---|---|
| Broker Selection | Kafka vs Pulsar vs Redpanda comparison | broker-selection.md |
| Stream Processing | Flink vs Spark vs Kafka Streams | processor-selection.md |
| Delivery | At-least-once, exactly-once patterns | delivery-guarantees.md |
| Exactly-Once | Transactional processing | exactly-once.md |
| Event Sourcing | Event store patterns | event-sourcing.md |
| CDC | Debezium integration | cdc-patterns.md |
| Error Handling | DLQ, retries, backpressure | error-handling.md |
| Sagas | Choreography vs Orchestration patterns | saga_patterns.md |
| Task Queues | BullMQ for TypeScript background jobs | bullmq.md |
| Task Queues | Celery for Python background jobs | celery.md |
| Workflows | Temporal for durable execution & sagas | temporal-workflows.md |
| Messaging | NATS for cloud-native request-reply | nats.md |
| Messaging | RabbitMQ for complex routing | rabbitmq.md |
| Messaging | Redis Streams for simple job queues | redis-streams.md |
| Java | Kafka Java client patterns | java-patterns.md |
| TypeScript | KafkaJS patterns | typescript-patterns.md |
| Python | confluent-kafka-python patterns | python-patterns.md |
| Go | kafka-go patterns | go-patterns.md |
External Documentation
- Apache Kafka Documentation
- Strimzi Kafka Operator
- Debezium Documentation
- CloudEvents Specification
- Apicurio Registry
- Saga Pattern (Microsoft)
- Transactional Outbox (Microservices.io)
- Event Sourcing (Martin Fowler)
- Kafka Exactly-Once Semantics
- Temporal Documentation
๐จ P19 Audit Status โ Integration Gaps (Feb 2026)
CRITICAL: Read
.agent/context/P19-AUDIT-STATUS.mdfor full details. Event-First Architecture: โ ๏ธ PARTIAL โ Shared starters exist but are dead code.
Dead Code Alert: Shared Starters NOT Integrated
| Starter | Status | Built | Consumers | Impact |
|---|---|---|---|---|
| events-starter (CloudEvents) | ๐ด Dead | โ | 0 services | Services publish raw Kafka, not CloudEvents |
| outbox-starter (Transactional Outbox) | ๐ด Dead | โ | 0 services | Financial txns can lose events during failures |
| saga-starter (Saga Orchestration) | ๐ด Dead | โ | 0 services | transaction-service has raw saga logic instead |
What Services Actually Do (vs What Starters Provide)
Current Reality:
transaction-service: Publishes Kafka events directly (no outbox, no CloudEvents envelope)wallet-service: Publishes Kafka events directly (same issue)lending-service: No event publishing at allsaga-starter: Full saga orchestration engine built but unused โ transaction-service has inline saga logic
What SHOULD Happen (Target State):
transaction-serviceโ Useoutbox-starterfor atomic event publishingwallet-serviceโ Useoutbox-starterfor balance change eventslending-serviceโ Usesaga-starterfor loan disbursement workflow- ALL events โ Use
events-starterCloudEvents envelope format
Integration Patterns to Apply
When asked to implement any event publishing:
// โ WRONG โ Current state (direct Kafka, no atomicity)
@Transactional
public Transfer execute(TransferCommand cmd) {
Transfer t = processTransfer(cmd);
transferRepo.save(t);
kafkaTemplate.send("transfers", t); // NOT atomic! Can lose events
return t;
}
// โ
CORRECT โ Use outbox-starter (atomic with DB transaction)
@Transactional
public Transfer execute(TransferCommand cmd) {
Transfer t = processTransfer(cmd);
transferRepo.save(t);
outboxRepository.save(OutboxEvent.of("Transfer", t.getId(), "TransferCompleted", t));
return t;
// Debezium CDC picks up outbox entry and publishes to Kafka
}
Outbox Starter Integration Steps
See docs/guides/LESSONS.md ยง "Transactional Outbox Pattern Integration" for full guide:
- Add
outbox-starterdependency to servicepom.xml - Include Flyway migration for
outbox_eventstable - Inject
OutboxRepositoryinto application services - Replace direct
kafkaTemplate.send()withoutboxRepository.save() - Configure Debezium CDC connector for the service's database
- Write integration test with Testcontainers (PostgreSQL + Kafka + Debezium)
Saga Starter Integration Steps
See docs/guides/LESSONS.md ยง "Saga Orchestration Pattern Integration":
- Add
saga-starterdependency - Define saga steps as
SagaStep<>implementations - Use
SagaOrchestratorto execute the saga - Implement compensation logic for each step
- Replace inline saga logic in transaction-service
Last Updated: February 2026 (P19 Audit)